package org.com.gr.capserver.config.mqtt.mqttCallBack;

import lombok.extern.slf4j.Slf4j;
import org.com.gr.capserver.config.mqtt.MqttClientManager;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;

/**
 * ClassName: AbsMqttCallBack
 * Description:
 *
 * @author binbin_hao
 * @date 2023/9/8 10:36
 */
@Slf4j
public abstract class AbsMqttCallBack implements MqttCallback, MqttCallbackExtended {
    //客户端名称
    private String clientName;
    private String topic;
    private MqttConnectOptions connectOptions;

    public String getClientName() {
        return clientName;
    }

    public void setClientName(String clientName) {
        this.clientName = clientName;
    }

    public MqttConnectOptions getConnectOptions() {
        return connectOptions;
    }

    public void setConnectOptions(MqttConnectOptions connectOptions) {
        this.connectOptions = connectOptions;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    /**
     * 失去连接操作,进行重连
     *
     * @param throwable 异常
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("mqtt 意外断开，系统设置了自动重连，异常信息：{}",throwable.getMessage());
    }

    /**
     * 接收订阅消息
     *
     * @param s           主题
     * @param mqttMessage 接收消息
     * @throws Exception 异常
     */
    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        String payload = new String(mqttMessage.getPayload(), Charset.forName("UTF-8"));
        log.info("Receive topic[{}],message={}", s, payload);
        Message commonMessage=new Message() {
            @Override
            public Object getPayload() {
                return payload;
            }

            @Override
            public MessageHeaders getHeaders() {
                Map map=new HashMap();
                map.put(MqttHeaders.QOS,mqttMessage.getQos());
                map.put(MqttHeaders.RECEIVED_TOPIC,s);
                map.put(MqttHeaders.ID,mqttMessage.getId());
                map.put(MqttHeaders.TOPIC,s);
                map.put(MqttHeaders.RETAINED,mqttMessage.isRetained());
                map.put(MqttHeaders.DUPLICATE,mqttMessage.isDuplicate());
                MessageHeaders messageHeaders=new MessageHeaders(map);
                return messageHeaders;
            }
        };
        handleReceiveMessage(s, commonMessage);
    }

    /**
     * 消息发送成功
     *
     * @param iMqttDeliveryToken toke
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }


    /**
     * 处理接收的消息
     *
     * @param topic   主题
     * @param commonMessage 消息内容
     */
    protected abstract void handleReceiveMessage(String topic, Message commonMessage);
    //这里面的逻辑是因为mqtt 在重连之后，订阅主题会丢失，这里面需要重新对主题进行订阅
    public void connectComplete(boolean reconnect, String serverURI) {
        log.info("重新连接成功后，重新订阅主题！");
        if (!reconnect) {
            log.info("{}连接成功",serverURI);
        }else{
            MqttClient mqttClient= MqttClientManager.getMqttClientById(this.getClientName());
            try {
                //这里在判断一下是不是连接成功，如果没有连接成功则在连接一下
                if (!mqttClient.isConnected()){
                    mqttClient.connect(this.getConnectOptions());
                }
                mqttClient.subscribe(this.getTopic());
            } catch (MqttException e) {
                e.printStackTrace();
                log.error("重新订阅失败");
            }
        }
    }
}
